Plume
Typed async events with sequenced and parallel dispatching
npm i @hazae41/plume
Node Package 📦
Features
Current features
- 100% TypeScript and ESM
- No external dependency
- Rust-like patterns
- Type-safe event dispatching and listening
- Event listeners can return values
- Sequenced and parallel dispatching
- Wait for events with composition
Usage
Emitters
type MyEvents = {
request: (data: string) => string,
close: (reason?: unknown) => void,
error: (reason?: unknown) => void,
}
class MyObject {
readonly events = new SuperEventTarget<MyEvents>()
async onError(reason?: unknown) {
await this.events.emit("error", reason)
}
async onClose() {
await this.event.emit("close")
}
async request(data: string): string {
const response = await this.events.emit("request", data)
if (response.isSome())
return response.get()
throw new Error(`Unhandled`)
}
}
Listeners
const object = new MyObject()
object.on("request", (request: string) => {
if (request === "hello")
return new Some("world")
return new None()
})
object.on("request", (request: string) => {
if (request === "it")
return new Some("works")
return new None()
})
object.on("request", (request: string) => {
if (request === "have")
return new Some("fun")
return new None()
})
Sequenced dispatching (default)
You can use sequenced listening using passive: false
(or passive: undefined
)
The listeners will be called one after the other
When a listener returns something, it will skip all other listeners
for (const listener of listeners) {
const returned = await listener(...)
if (returned.isSome())
return returned
continue
}
return new None()
myObject.events.on("message", async (message: string) => {
await doSometing(message)
return new Some(1)
}, { passive: false })
myObject.events.on("message", async (message: string) => {
await doSometing2(message)
return new Some(2)
}, { passive: false })
console.log(await myObject.emit("message", "hello world"))
Parallel dispatching
Parallel listening using passive: true
Both listeners will be called at the same time
Their result will be retrieved with Promise.all
const promises = new Array<Promise<...>>()
for (const listener of listeners)
promises.push(listener(...))
const returneds = await Promise.all(promises)
for (const returned of returneds)
if (returned.isSome())
return returned
return new None()
myObject.events.on("message", async (message: string) => {
await doSometing(message)
return new Some(1)
}, { passive: true })
myObject.events.on("message", async (message: string) => {
await doSometing(e.data)
return new Some(2)
}, { passive: true })
console.log(await myObject.emit("message", "hello world"))
Waiting for an event
In this example we have a target with a send()
method and a message
event
We want to send a message with some ID and wait for a reply with the same ID, skipping replies with other ID
Waiting is always done using passive: true
import { Future } from "@hazae41/future"
async function requestAndWait(id: number, request: string): Promise<string> {
const socket = new MySocket()
socket.send({ id, text: request })
const response = await socket.wait("message", async (future: Future<string>, message) => {
if (message.id === id) {
future.resolve(message.text)
return new None()
}
return new None()
})
return response
}
Composing waiters with automatic disposal
Same as above but this time the event is raced with other events in a composable way
When one event is resolved or rejected, it will stop listening to the other (it is disposed by the using
keyword)
import { Future } from "@hazae41/future"
async function requestAndWaitOrClose(id: number, request: string): Promise<string> {
const socket = new MySocket()
socket.send({ id, text: request })
using event = socket.wait("message", async (future: Future<string>, message) => {
if (message.id === id) {
future.resolve(message.text)
return new None()
}
return new None()
})
using close = socket.wait("close", (future: Future<never>) => {
future.reject(new Error("Closed"))
return new None()
})
return await Promise.race([event, close])
}
Plume provides some helper functions for doing this with fewer lines of code
import { Future } from "@hazae41/future"
async function requestAndWaitOrCloseOrErrorOrSignal(id: number, request: string, signal: AbortSignal): Promise<string> {
const socket = new MySocket()
socket.send({ id, text: request })
using event = socket.wait("message", async (future: Future<string>, message) => {
if (message.id === id) {
future.resolve(message.text)
return new None()
}
return new None()
})
using abort = Plume.AbortedError.waitOrThrow(signal)
using error = Plume.ErroredError.waitOrThrow(socket)
using close = Plume.ClosedError.waitOrThrow(socket)
return await Promise.race([event, close, error, abort])
}
And it provides helpers for common error-close-signal patterns
import { Future } from "@hazae41/future"
async function requestAndWaitOrCloseOrErrorOrSignal(id: number, request: string, signal: AbortSignal): Promise<string> {
const socket = new MySocket()
socket.send({ id, text: request })
const response = await Plume.waitOrCloseOrErrorOrSignal(socket, "message", async (future: Future<string>, message) => {
if (message.id === id) {
future.resolve(message.text)
return new None()
}
return new None()
}, signal)
return response
}